package com.dbflow5.reactivestreams.query;

import com.dbflow5.config.DBFlowDatabase;
import com.dbflow5.config.FlowManager;
import com.dbflow5.database.DatabaseWrapper;
import com.dbflow5.observing.OnTableChangedObserver;
import com.dbflow5.observing.TableObserver;
import com.dbflow5.query.From;
import com.dbflow5.query.ModelQueriable;
import com.dbflow5.reactivestreams.transaction.TransactionObservable;
import com.dbflow5.transaction.Transaction;
import io.reactivex.rxjava3.core.FlowableEmitter;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
 * Description: Emits when table changes occur for the related table on the [ModelQueriable].
 * If the [ModelQueriable] relates to a [Join], this can be multiple tables.
 */
public class TableChangeOnSubscribe<T, R> implements FlowableOnSubscribe<R> {

    private final ModelQueriable<T> modelQueriable;
    private final BiFunction<ModelQueriable<T>, DatabaseWrapper, R> evalFn;

    public TableChangeOnSubscribe(ModelQueriable<T> modelQueriable, BiFunction<ModelQueriable<T>, DatabaseWrapper, R> evalFn) {
        this.modelQueriable = modelQueriable;
        this.evalFn = evalFn;

        onTableChangedObserver = new OnTableChangedObserver(new ArrayList<>(associatedTables())) {
            @Override
            protected void onChanged(Set<Class<?>> tables) {
                if (!tables.isEmpty()) {
                    evaluateEmission(tables.stream().findFirst().get());
                }
            }
        };
    }

    private FlowableEmitter<R> flowableEmitter;

    private final OnTableChangedObserver onTableChangedObserver;

    private final CompositeDisposable currentTransactions = new CompositeDisposable();

    private Set<Class<?>> associatedTables() {
        Set<Class<?>> classSet = null;
        From<T> from = modelQueriable.extractFrom();
        if(from != null){
            classSet = from.associatedTables();
        }
        if(classSet == null){
            classSet = new HashSet<>(Collections.singleton(modelQueriable.table()));
        }
        return classSet;
    }

    private void evaluateEmission(Class<?> table) {
        if(table == null) {
            table =  modelQueriable.table();
        }
        if (this.flowableEmitter != null) {
            Transaction.Builder<R> builder = FlowManager.databaseForTable(table, dbFlowDatabase -> null)
            .beginTransactionAsync((Function<DatabaseWrapper, R>) databaseWrapper -> evalFn.apply(modelQueriable, databaseWrapper))
            .shouldRunInTransaction(false);

            Disposable disposable = TransactionObservable.asMaybe(builder).subscribe(r -> flowableEmitter.onNext(r));
            currentTransactions.add(disposable);
        }
    }

    public void subscribe(FlowableEmitter<R> e) {
        flowableEmitter = e;

        DBFlowDatabase db = FlowManager.getDatabaseForTable(associatedTables().stream().findFirst().get());
        // force initialize the dbr
        db.getWritableDatabase();

        TableObserver observer = db.tableObserver();
        e.setDisposable(Disposable.fromRunnable(() -> {
            observer.removeOnTableChangedObserver(onTableChangedObserver);
            currentTransactions.dispose();
        }));
        observer.addOnTableChangedObserver(onTableChangedObserver);

        // emit once on subscribe.
        evaluateEmission(modelQueriable.table());
    }

}
